package com.atguigu.flink.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

import static org.apache.flink.api.common.typeinfo.Types.*;


/**
 * Created by Smexy on 2023/11/7
 *
     算子链： 把上下游挨着的两个算子，链在一起，变成一个算子。

 */
public class Demo8_OperatorChain
{
    public static void main(String[] args) throws Exception {

        //定义WEBUI绑定的端口
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",3333);

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //全局禁止算子链
        //environment.disableOperatorChaining();

        DataStreamSource<String> dataStreamSource1 = environment.socketTextStream("hadoop102", 8888);

        dataStreamSource1.
            flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                String[] words = value.split(" ");
                Arrays.stream(words)
                      .forEach(
                          word -> out.collect(Tuple2.of(word,1))
                      );
            }).returns(TUPLE(STRING,INT))
            .keyBy(t -> t.f0)
            .sum(1)
            .map(x -> x).returns(TUPLE(STRING,INT))
            //从map开始，开启一个新的链。map和前面的算子不链在一起
            //.startNewChain()
            //map不参与算子链，和前后的算子都不链在一起
            .disableChaining()
            .print();

        environment.execute();

    }

    private static class MyWordCountFlatmapFunction2 implements FlatMapFunction<String, Tuple2<String,Integer>>
    {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] words = value.split(" ");
            Arrays.stream(words)
                  .forEach(
                      word -> out.collect(Tuple2.of(word,1))
                  );
        }
    }
}
